Skip to content

[flink] Expose scan.bucket for single-bucket manifest pruning#8117

Open
wwj6591812 wants to merge 1 commit into
apache:masterfrom
wwj6591812:add_scan_bucket_0604
Open

[flink] Expose scan.bucket for single-bucket manifest pruning#8117
wwj6591812 wants to merge 1 commit into
apache:masterfrom
wwj6591812:add_scan_bucket_0604

Conversation

@wwj6591812

Copy link
Copy Markdown
Contributor

Background

ReadBuilder.withBucket(int) and manifest scanning already support reading a single bucket, but Flink SQL had no connector option to expose it. Operators often need to debug or scan one bucket of a fixed-bucket primary-key table without reading all buckets.

Why this PR

Expose scan.bucket in Flink so users can run:

SELECT * FROM t /*+ OPTIONS('scan.bucket' = '0') */

and plan splits only for that bucket.

What changes

  • Add FlinkConnectorOptions.SCAN_BUCKET (scan.bucket).
  • ScanBucketUtils.applyScanBucket() reads the option and calls ReadBuilder.withBucket().
  • Wire into FlinkSourceBuilder and FlinkTableSource (batch and split inference).
  • Validate in ReadBuilderImpl.withBucket() (canonical read path): non-negative bucket id, FileStoreTable only, not postpone-bucket mode, bucket < table.bucket when table bucket > 0.

Stage optimized: scan / manifest planning — fewer manifest entries and splits before read. No change to merge or per-record logic.

Tests

  • ScanBucketUtilsTest — invalid bucket id fails fast.
  • ScanBucketITCase — SQL with scan.bucket matches reading that bucket via the table API.

Test plan

  • mvn test -pl paimon-flink/paimon-flink-common -am -Dtest=ScanBucketUtilsTest,ScanBucketITCase

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 17c2722 to 7e8c5d8 Compare June 4, 2026 09:45
@wwj6591812

Copy link
Copy Markdown
Contributor Author

The failed test is not related to my modifications.

@JingsongLi

Copy link
Copy Markdown
Contributor

The validation here still allows scan.bucket on non-fixed-bucket tables.

validateSpecifiedBucket rejects postpone bucket tables, but it does not require a fixed bucket mode or bucket > 0. For bucket-unaware/dynamic bucket tables, CoreOptions.bucket() can be <= 0, so the upper-bound check is skipped and the scan proceeds with physical bucket pruning. That can turn an invalid configuration into an empty/incorrect result instead of a clear error. The generated doc says this option is only supported for fixed-bucket primary-key tables (bucket > 0).

Could we enforce that here, e.g. require the fixed bucket mode and configured bucket count > 0, and also check primary-key-ness if the option is intended only for primary-key tables?

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 7e8c5d8 to 9b67691 Compare June 8, 2026 02:30
@wwj6591812

Copy link
Copy Markdown
Contributor Author

The validation here still allows scan.bucket on non-fixed-bucket tables.

validateSpecifiedBucket rejects postpone bucket tables, but it does not require a fixed bucket mode or bucket > 0. For bucket-unaware/dynamic bucket tables, CoreOptions.bucket() can be <= 0, so the upper-bound check is skipped and the scan proceeds with physical bucket pruning. That can turn an invalid configuration into an empty/incorrect result instead of a clear error. The generated doc says this option is only supported for fixed-bucket primary-key tables (bucket > 0).

Could we enforce that here, e.g. require the fixed bucket mode and configured bucket count > 0, and also check primary-key-ness if the option is intended only for primary-key tables?

Hi, thanks for your review.
I have updated code, now alidateSpecifiedBucket requires HASH_FIXED bucket mode, primary keys, and bucket > 0. Added core UT (ReadBuilderImplTest) and extended Flink UT/IT for invalid table types.

Please CC, Thx.
@JingsongLi

if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
ScanBucketUtils.applyScanBucket(table, readBuilder, conf);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies scan.bucket only to the normal source read builder. Aggregate pushdown plans splits through AggregatePushDownUtils.planSplits(...) with a separate ReadBuilder, so queries such as SELECT COUNT() FROM T /+ OPTIONS(scan.bucket=0) */ can still aggregate all buckets while non-aggregate reads scan only the requested bucket. Please either apply SCAN_BUCKET in the aggregate pushdown planning path as well, or disable aggregate pushdown when scan.bucket is set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @JingsongLi.
I have applied ScanBucketUtils.applyScanBucket in AggregatePushDownUtils.planSplits(...) as well, so the aggregate pushdown planning path now respects the scan.bucket option too.

"Bucket scan is only supported for fixed-bucket tables, but got bucket mode %s.",
fileStoreTable.bucketMode());
checkArgument(
!fileStoreTable.schema().primaryKeys().isEmpty(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the public core ReadBuilder.withBucket(...) API reject fixed-bucket append tables, even though bucket-level manifest pruning is useful and valid there too. If the primary-key restriction is only meant for the Flink scan.bucket option, could we keep ReadBuilder.withBucket(...) generic and enforce the Flink option restriction in the Flink scan.bucket path instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @JingsongLi.
I have removed the primary-key restriction from ReadBuilderImpl.validateSpecifiedBucket(...) to keep the core ReadBuilder.withBucket(...) API generic for fixed-bucket append tables. The primary-key check is now enforced inside ScanBucketUtils.applyScanBucket(...), which is the Flink scan.bucket option path. I also updated ReadBuilderImplTest accordingly.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch 3 times, most recently from 3b01bc0 to 9793995 Compare June 10, 2026 06:50

List<Row> expected = readRowsFromBucket(table, targetBucket);

List<Row> actual =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a regression assertion for the aggregate-pushdown path here? The earlier bug was that SELECT COUNT(*) FROM T /*+ OPTIONS('scan.bucket'='...') */ planned splits through AggregatePushDownUtils.planSplits, separate from the normal source path covered by SELECT *. The new ScanBucketUtils.applyScanBucket call in AggregatePushDownUtils fixes that path, but without an aggregate query test it can regress while this IT still passes. A small COUNT(*) (or MIN/MAX) assertion against the same bucket rows would cover it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi

Thanks for the suggestion — you're right that the existing IT only covered the normal source read path via SELECT *, while the earlier bug was specifically in aggregate pushdown planning through AggregatePushDownUtils.planSplits.

I've extended testScanBucketFilter with a SELECT COUNT() FROM T /+ OPTIONS('scan.bucket' = '...') */ assertion and verified it matches the row count from the same bucket read via the table API. This should prevent regressions on the aggregate pushdown path while the SELECT * test continues to cover the normal source path. Please take another look when you have a chance.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 9793995 to 7edcb6a Compare June 16, 2026 02:01
+ "normal source, the max partition(s) will be determined before job running "
+ "without refreshing even for streaming jobs.");

public static final ConfigOption<Integer> SCAN_BUCKET =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this option just be in CoreOptions? And Flink does need to do anything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — scan.bucket is a core scan option, not Flink-specific. I moved SCAN_BUCKET to CoreOptions and let ReadBuilderImpl apply it automatically from table options (via withBucket()), so Flink no longer needs ScanBucketUtils or connector-level handling. Validation tests were moved to ReadBuilderImplTest, and the existing Flink ITCase still covers end-to-end behavior through SQL hints.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 7edcb6a to 04a22bc Compare June 23, 2026 10:18
@@ -245,13 +246,13 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
protected void scanSplitsForInference() {
if (splitStatistics == null) {
if (table instanceof DataTable) {
List<PartitionEntry> partitionEntries =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert changes in this class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — the changes in FlinkTableSource were only needed to wire up ScanBucketUtils, which is no longer required now that scan.bucket lives in CoreOptions and is applied automatically by ReadBuilderImpl. I reverted FlinkTableSource to its original form.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 04a22bc to 33e8b5e Compare June 24, 2026 01:17
this.defaultPartitionName = new CoreOptions(table.options()).partitionDefaultName();
CoreOptions coreOptions = new CoreOptions(table.options());
this.defaultPartitionName = coreOptions.partitionDefaultName();
Integer scanBucket = coreOptions.scanBucket();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scan.bucket is now a CoreOptions scan option, but it is only consumed by ReadBuilderImpl. Core scan options such as scan.snapshot-id are also honored when callers do table.copy(dynamicOptions).newScan().plan(), and there are existing core tests/users on that path. With this implementation, table.copy(Map.of("scan.bucket", "0")).newScan().plan() still scans all buckets while table.copy(...).newReadBuilder().newScan().plan() scans one bucket, so the same table options produce different results depending on whether the caller goes through ReadBuilder. Please apply/validate the option in the DataTableScan/SnapshotReader path as well, or keep it out of CoreOptions if it is intended to be ReadBuilder-only.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @JingsongLi.

You're right — scan.bucket was only applied in ReadBuilderImpl, so table.copy(Map.of("scan.bucket", "0")).newScan().plan() still scanned all buckets while other scan options worked on the direct scan path.

I've moved the option handling into AbstractDataTableScan: the constructor now reads scan.bucket from CoreOptions, validates it via ReadBuilderImpl.validateScanBucketOption(), and applies it to SnapshotReader.withBucket(). This makes table.newScan() and table.copy(...).newScan() behave consistently with other core scan options.

Added tests for the direct scan path (testScanBucketOptionViaDirectTableScan, testScanBucketOptionViaTableCopy, and rejection cases). Please take another look when you have time.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 33e8b5e to 6af1076 Compare June 24, 2026 08:18
@@ -97,6 +97,15 @@ protected AbstractDataTableScan(
this.options = options;
this.snapshotReader = snapshotReader;
this.queryAuth = queryAuth;
applyScanBucketOption();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use withBucket and add validation in validateScanBucketOption.

Do not modify ReadBuilderImpl.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Thanks for the review. I've updated the implementation as suggested:

1、Removed the constructor-time applyScanBucketOption() from AbstractDataTableScan.

2、Moved validation into AbstractDataTableScan.withBucket() via validateScanBucketOption(), which checks primary-key fixed-bucket tables and valid bucket range before applying manifest pruning.

3、Applied the scan.bucket table option in AbstractFileStoreTable.newScan() / newStreamScan() by calling withBucket(), instead of wiring it implicitly in the scan constructor.

4、Reverted all changes to ReadBuilderImpl — it is now identical to master.

Unit tests were moved from ReadBuilderImplTest to ScanBucketTest, and the existing Flink ScanBucketITCase remains unchanged. Please take another look when you have a chance.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 6af1076 to e5815f5 Compare June 29, 2026 06:56
@wwj6591812

Copy link
Copy Markdown
Contributor Author

@JingsongLi
Hi,Please CC, Thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants